package com.ifast.face.service.run;

import java.awt.image.BufferedImage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.bytedeco.javacv.Frame;
import org.bytedeco.javacv.Java2DFrameConverter;

import com.ifast.face.service.FaceEngineService;

import cn.hutool.core.util.ReflectUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RtmpTurnRun implements Runnable{
	 
	private LinkedBlockingQueue<BufferedImage> takeQueue = new LinkedBlockingQueue<>(10);
	public ExecutorService executor = Executors.newFixedThreadPool(1);
	
	private FaceEngineService faceEngineService;
	private String fetchUrl;
	private String pushUrl;
	private String groupName;
	private boolean stop = true;
	private FaceRun faceRun;
	
	public RtmpTurnRun(FaceEngineService faceEngineService,String fetchUrl,String pushUrl,String groupName){
		this.faceEngineService = faceEngineService;
		this.fetchUrl = fetchUrl;
		this.pushUrl = pushUrl;
		this.groupName = groupName;
	}
	
	@Override
	public void run() {
		try { 
			pipeline().go(); 
		}catch (Exception e) { 
			e.printStackTrace();
			stop();
		}
	} 
	 
 	FrameGrabberAdapter grabber;
 	FrameRecorderAdapter recorder; 
 	
 	private RtmpTurnRun pipeline(){
 		FrameAdapter framea = new FrameAdapter(); 
		grabber = framea.from(fetchUrl);
		recorder = framea.to(pushUrl);  
		return this;
	} 
 	
    Java2DFrameConverter converter = new Java2DFrameConverter();
    public void go() throws Exception, org.bytedeco.javacv.FrameGrabber.Exception{ 
    	log.debug("-from="+fetchUrl+"=to="+pushUrl);  
		Frame img = null;
		BufferedImage bImg = null;
		int counts = 0;  
		//获取图片
		faceRun = new FaceRun(takeQueue,groupName,faceEngineService);
		executor.execute(faceRun); 
		//long startTime= System.nanoTime();
		//long videoTS=0;
		while (stop){  
			if((img = grabber.grab()) == null 
					|| (bImg = converter.convert(img)) == null) continue;
			//100取一帧
			if(counts == 100){
				counts = 0;
				takeQueue.offer(bImg); 
			} 
			counts++;  
			//videoTS = (System.nanoTime() - startTime) / 1000; 
			recorder.setTimestamp(img.timestamp);
			recorder.record(img);
			  
		}
		stop();
	} 
    
    final long awaitTime = 5 * 1000;  
    private void stop(){
//		log.debug("=============开始停止推送================");
		System.out.println("=============开始停止推送================"); 
    	ReflectUtil.setFieldValue(faceRun, "stop", false);
		
		try {  
			takeQueue.clear();
			recorder.close();
			grabber.close(); 
	        // 向学生传达“问题解答完毕后请举手示意！”  
		 	executor.shutdown();   
	        // 向学生传达“XX分之内解答不完的问题全部带回去作为课后作业！”后老师等待学生答题  
	        // (所有的任务都结束的时候，返回TRUE)  
	        if(!executor.awaitTermination(awaitTime, TimeUnit.MILLISECONDS)){  
	            // 超时的时候向线程池中所有的线程发出中断(interrupted)。  
	        	executor.shutdownNow();  
	        }  
	    } catch (Exception e) {  
	        // awaitTermination方法被中断的时候也中止线程池中全部的线程的执行。  
	        System.out.println("awaitTermination interrupted: " + e);  
	        executor.shutdownNow();  
	    }
//		log.debug("=============终止推送================");
		System.out.println("=============终止推送================"); 
    }
}
